Ensembles & Pipelines

python
datacamp
machine learning
pyspark
ensemble
pipeline
Author

kakamana

Published

April 11, 2023

Ensembles & Pipelines

Finally, you will learn how to improve the efficiency of your models. By using pipelines, you will be able to make your code more clear and easier to maintain. To test your models more effectively and select good model parameters, you will use cross-validation. Last but not least, you will experiment with two types of ensemble models.

This Ensembles & Pipelines is part of Datacamp course: Machine Learning with PySpark Spark is a powerful, general-purpose tool for working with large data sets. Spark transparently distributes compute tasks across a cluster. By doing this, operations are fast, but you can also focus on the analysis rather than worry about technical details. This course will teach you how to get data into Spark, and then dive into three fundamental Spark Machine Learning algorithms: Linear Regression, Logistic Regression/Classifiers, and creating pipelines. You will analyze a large dataset of flight delays and spam text messages along the way. With this background, you will be able to harness the power of Spark and apply it to your own Machine Learning projects.

This is my learning experience of data science through DataCamp. These repository contributions are part of my learning journey through my graduate program masters of applied data sciences (MADS) at University Of Michigan, DeepLearning.AI, Coursera & DataCamp. You can find my similar articles & more stories at my medium & LinkedIn profile. I am available at kaggle & github blogs & github repos. Thank you for your motivation, support & valuable feedback.

These include projects, coursework & notebook which I learned through my data science journey. They are created for reproducible & future reference purpose only. All source code, slides or screenshot are intellactual property of respective content authors. If you find these contents beneficial, kindly consider learning subscription from DeepLearning.AI Subscription, Coursera, DataCamp

Code
import pyspark
from pyspark.sql import SparkSession

import pandas as pd
import numpy as np

Pipeline

Flight duration model - Pipeline stages

You’re going to create the stages for the flights duration model pipeline. You will use these in the next exercise to build a pipeline and to create a regression model.

Code
spark = SparkSession.builder.master('local[*]').appName('flights').getOrCreate()

# Read data from CSV file
flights = spark.read.csv('dataset/flights-larger.csv', sep=',', header=True, inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
print(flights.printSchema())
print(flights.dtypes)
23/04/11 01:11:44 WARN Utils: Your hostname, kamrans-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 192.168.1.18 instead (on interface en1)
23/04/11 01:11:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/04/11 01:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/11 01:11:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/11 01:11:45 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/04/11 01:11:45 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
The data contain 275000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: integer (nullable = true)

None
[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Code
from pyspark.sql.functions import round

# Convert 'mile' to 'km' and drop 'mile' column
flights = flights.withColumn('km', round(flights.mile * 1.60934, 0)).drop('mile')
Code
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression

# Convert categorical strings to index values
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# One-hot encode index values
onehot = OneHotEncoder(
    inputCols=['org_idx', 'dow'],
    outputCols=['org_dummy', 'dow_dummy']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')

Flight duration model: Pipeline model

You’re now ready to put those stages together in a pipeline.

You’ll construct the pipeline and then train the pipeline on the training data. This will apply each of the individual stages in the pipeline to the training data in turn. None of the stages will be exposed to the testing data at all: there will be no leakage!

Once the entire pipeline has been trained it will then be used to make predictions on the testing data.

Code
from pyspark.ml import Pipeline

flights_train, flights_test = flights.randomSplit([0.8, 0.2])

# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Train the pipeline on the training data
pipeline = pipeline.fit(flights_train)

# Make predictions on the test data
predictions = pipeline.transform(flights_test)
23/04/11 01:14:23 WARN Instrumentation: [8d3fcef9] regParam is zero, which might cause numerical instability and overfitting.
23/04/11 01:14:23 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/04/11 01:14:24 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK

SMS spam pipeline

You haven’t looked at the SMS data for quite a while. Last time we did the following:

  • split the text into tokens
  • removed stop words
  • applied the hashing trick
  • converted the data from counts to IDF and
  • trained a linear regression model.

Each of these steps was done independently. This seems like a great application for a pipeline!

Code
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Read data from CSV file
sms = spark.read.csv('dataset/sms.csv', sep=';', header=False, schema=schema, nullValue='NA')

sms.show(5)
+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
+---+--------------------+-----+
only showing top 5 rows
Code
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol='hash')
idf = IDF(inputCol=hasher.getOutputCol(), outputCol='features')

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

Cross-Validation

Cross validating simple flight duration model

You’ve already built a few models for predicting flight duration and evaluated them with a simple train/test split. However, cross-validation provides a much better way to evaluate model performance.

In this exercise you’re going to train a simple model for flight duration using cross-validation. Travel time is usually strongly correlated with distance, so using the km column alone should give a decent model.

Code
assembler = VectorAssembler(inputCols=['km'], outputCol='features')

flights = assembler.transform(flights.drop('features'))

flights.show(5)
+---+---+---+-------+------+---+------+--------+-----+------+--------+
|mon|dom|dow|carrier|flight|org|depart|duration|delay|    km|features|
+---+---+---+-------+------+---+------+--------+-----+------+--------+
| 10| 10|  1|     OO|  5836|ORD|  8.18|      51|   27| 253.0| [253.0]|
|  1|  4|  1|     OO|  5866|ORD|  15.5|     102| null| 750.0| [750.0]|
| 11| 22|  1|     OO|  6016|ORD|  7.17|     127|  -19|1188.0|[1188.0]|
|  2| 14|  5|     B6|   199|JFK| 21.17|     365|   60|3618.0|[3618.0]|
|  5| 25|  3|     WN|  1675|SJC| 12.92|      85|   22| 621.0| [621.0]|
+---+---+---+-------+------+---+------+--------+-----+------+--------+
only showing top 5 rows
Code
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

flights_train, flights_test = flights.randomSplit([0.8, 0.2])

# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create a cross validator
cv = CrossValidator(estimator=regression, estimatorParamMaps=params,
                    evaluator=evaluator, numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)
23/04/11 01:25:42 WARN Instrumentation: [9bf6d099] regParam is zero, which might cause numerical instability and overfitting.
23/04/11 01:25:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/11 01:25:42 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/04/11 01:25:43 WARN Instrumentation: [27679707] regParam is zero, which might cause numerical instability and overfitting.
23/04/11 01:25:44 WARN Instrumentation: [09ebdce0] regParam is zero, which might cause numerical instability and overfitting.
23/04/11 01:25:45 WARN Instrumentation: [27307a36] regParam is zero, which might cause numerical instability and overfitting.
23/04/11 01:25:46 WARN Instrumentation: [ab50fc5c] regParam is zero, which might cause numerical instability and overfitting.
23/04/11 01:25:47 WARN Instrumentation: [9fadf9ca] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Cross validating flight duration model pipeline

The cross-validated model that you just built was simple, using km alone to predict duration.

Another important predictor of flight duration is the origin airport. Flights generally take longer to get into the air from busy airports. Let’s see if adding this predictor improves the model!

In this exercise you’ll add the org field to the model. However, since org is categorical, there’s more work to be done before it can be included: it must first be transformed to an index and then one-hot encoded before being assembled with km and used to build the regression model. We’ll wrap these operations up in a pipeline.

Code
params = ParamGridBuilder().build()

# Create regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# Create an indexer for the org field
indexer = StringIndexer(inputCol='org', outputCol='org_idx')

# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(inputCol='org_idx', outputCol='org_dummy')

# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['km', 'org_dummy'], outputCol='features')

# Create a pipeline and cross-validator
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator)

Ensemble

  • collection of models
  • Wisdom of the Crowd - collective opinion of a group better than that of a single expert
  • Random Forest
    • an ensemble of Decision Tree
    • Creating model diversity
      • each tree trained on random subset of data
      • random subset of features used for splitting at each node
    • No two trees in the forest should be the same
  • Gradient-Boosted Trees
    • Iterative boosting algorithm:
    1. Build a Decision Tree and add to ensemble
    2. Predict label for each training instance using ensemble
    3. Compare predictions with known labels
    4. Emphasize training instances with incorrect predictions
    5. return to 1.

Delayed flights with Gradient-Boosted Trees

You’ve previously built a classifier for flights likely to be delayed using a Decision Tree. In this exercise you’ll compare a Decision Tree model to a Gradient-Boosted Trees model.

Code
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

assembler = VectorAssembler(inputCols=['mon', 'depart', 'duration'], outputCol='features')
flights = assembler.transform(flights.drop('features'))
flights = flights.withColumn('label', (flights.delay >= 15).cast('integer'))
flights = flights.select('mon', 'depart', 'duration', 'features', 'label')
flights = flights.dropna()

flights.show(5)
+---+------+--------+-----------------+-----+
|mon|depart|duration|         features|label|
+---+------+--------+-----------------+-----+
| 10|  8.18|      51| [10.0,8.18,51.0]|    1|
| 11|  7.17|     127|[11.0,7.17,127.0]|    0|
|  2| 21.17|     365|[2.0,21.17,365.0]|    1|
|  5| 12.92|      85| [5.0,12.92,85.0]|    1|
|  3| 13.33|     182|[3.0,13.33,182.0]|    1|
+---+------+--------+-----------------+-----+
only showing top 5 rows
Code
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pprint import pprint

flights_train, flights_test = flights.randomSplit([0.8, 0.2])

# Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

# Compare AUC on test data
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(tree.transform(flights_test))
evaluator.evaluate(gbt.transform(flights_test))

# Find the number of trees and the relative importance of features
pprint(gbt.trees)
print(gbt.featureImportances)
                                                                                
[DecisionTreeRegressionModel: uid=dtr_da5cb5beddbf, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_9ba36e5b4550, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_b2a777f5a16c, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_209795a3a14f, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_b71c35fabc6b, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_cc236de45a82, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_19450b882bd5, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_515b0f246121, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_37eb7de293be, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_5e7c64f779f3, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_e275675a9245, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_7b15c8d4ff36, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_57fab16b6718, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_55e31beee3e3, depth=5, numNodes=61, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_8db1b1896898, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_013906547839, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_a7e676c3fe85, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_a586a5bf8967, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_3dd45367e24c, depth=5, numNodes=63, numFeatures=3,
 DecisionTreeRegressionModel: uid=dtr_a5a3ad28722b, depth=5, numNodes=63, numFeatures=3]
(3,[0,1,2],[0.3982045188114991,0.3047020752257714,0.29709340596272954])

Delayed flights with a Random Forest

In this exercise you’ll bring together cross validation and ensemble methods. You’ll be training a Random Forest classifier to predict delayed flights, using cross validation to choose the best values for model parameters.

You’ll find good values for the following parameters:

featureSubsetStrategy — the number of features to consider for splitting at each node and maxDepth — the maximum number of splits along any branch.

Code
from pyspark.ml.classification import RandomForestClassifier

# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
        .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
        .addGrid(forest.maxDepth, [2, 5, 10]) \
        .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator=forest, estimatorParamMaps=params,
                    evaluator=evaluator, numFolds=5)

Evaluating Random Forest

In this final exercise you’ll be evaluating the results of cross-validation on a Random Forest model

Code
cvModel = cv.fit(flights_train)

# Average AUC for each parameter combination in grid
avg_auc = cvModel.avgMetrics

# Average AUC for the best model
best_model_auc = max(avg_auc)

# What's the optimal paramter value?
opt_max_depth = cvModel.bestModel.explainParam('maxDepth')
opt_feat_substrat = cvModel.bestModel.explainParam('featureSubsetStrategy')

# AUC for best model on test data
best_auc = evaluator.evaluate(cvModel.transform(flights_test))
print(best_auc)
23/04/11 01:44:53 WARN DAGScheduler: Broadcasting large task binary with size 1350.9 KiB
23/04/11 01:44:53 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:44:54 WARN DAGScheduler: Broadcasting large task binary with size 1378.0 KiB
23/04/11 01:44:58 WARN DAGScheduler: Broadcasting large task binary with size 1524.6 KiB
23/04/11 01:45:02 WARN DAGScheduler: Broadcasting large task binary with size 1392.3 KiB
23/04/11 01:45:02 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:03 WARN DAGScheduler: Broadcasting large task binary with size 1208.6 KiB
23/04/11 01:45:06 WARN DAGScheduler: Broadcasting large task binary with size 1392.3 KiB
23/04/11 01:45:06 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:07 WARN DAGScheduler: Broadcasting large task binary with size 1208.6 KiB
23/04/11 01:45:11 WARN DAGScheduler: Broadcasting large task binary with size 1355.6 KiB
23/04/11 01:45:12 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:12 WARN DAGScheduler: Broadcasting large task binary with size 1355.0 KiB
23/04/11 01:45:16 WARN DAGScheduler: Broadcasting large task binary with size 1451.9 KiB
23/04/11 01:45:19 WARN DAGScheduler: Broadcasting large task binary with size 1382.8 KiB
23/04/11 01:45:20 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:20 WARN DAGScheduler: Broadcasting large task binary with size 1202.0 KiB
23/04/11 01:45:24 WARN DAGScheduler: Broadcasting large task binary with size 1382.8 KiB
23/04/11 01:45:24 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:25 WARN DAGScheduler: Broadcasting large task binary with size 1202.0 KiB
23/04/11 01:45:29 WARN DAGScheduler: Broadcasting large task binary with size 1346.4 KiB
23/04/11 01:45:29 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:30 WARN DAGScheduler: Broadcasting large task binary with size 1350.7 KiB
23/04/11 01:45:33 WARN DAGScheduler: Broadcasting large task binary with size 1486.4 KiB
23/04/11 01:45:37 WARN DAGScheduler: Broadcasting large task binary with size 1386.7 KiB
23/04/11 01:45:37 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:38 WARN DAGScheduler: Broadcasting large task binary with size 1213.8 KiB
23/04/11 01:45:41 WARN DAGScheduler: Broadcasting large task binary with size 1386.7 KiB
23/04/11 01:45:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:42 WARN DAGScheduler: Broadcasting large task binary with size 1213.8 KiB
23/04/11 01:45:46 WARN DAGScheduler: Broadcasting large task binary with size 1352.6 KiB
23/04/11 01:45:46 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:45:47 WARN DAGScheduler: Broadcasting large task binary with size 1352.5 KiB
23/04/11 01:45:50 WARN DAGScheduler: Broadcasting large task binary with size 1455.6 KiB
23/04/11 01:45:54 WARN DAGScheduler: Broadcasting large task binary with size 1396.3 KiB
23/04/11 01:45:55 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/04/11 01:45:55 WARN DAGScheduler: Broadcasting large task binary with size 1216.2 KiB
23/04/11 01:45:58 WARN DAGScheduler: Broadcasting large task binary with size 1396.3 KiB
23/04/11 01:45:59 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/04/11 01:45:59 WARN DAGScheduler: Broadcasting large task binary with size 1216.2 KiB
23/04/11 01:46:03 WARN DAGScheduler: Broadcasting large task binary with size 1340.2 KiB
23/04/11 01:46:04 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:46:04 WARN DAGScheduler: Broadcasting large task binary with size 1333.2 KiB
23/04/11 01:46:08 WARN DAGScheduler: Broadcasting large task binary with size 1460.4 KiB
23/04/11 01:46:11 WARN DAGScheduler: Broadcasting large task binary with size 1384.7 KiB
23/04/11 01:46:12 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:46:12 WARN DAGScheduler: Broadcasting large task binary with size 1167.4 KiB
23/04/11 01:46:15 WARN DAGScheduler: Broadcasting large task binary with size 1384.7 KiB
23/04/11 01:46:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:46:16 WARN DAGScheduler: Broadcasting large task binary with size 1167.4 KiB
23/04/11 01:46:20 WARN DAGScheduler: Broadcasting large task binary with size 1368.7 KiB
23/04/11 01:46:20 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
23/04/11 01:46:21 WARN DAGScheduler: Broadcasting large task binary with size 1182.1 KiB
0.6838135218254684